Skip to content

Conversation

@rescrv
Copy link
Contributor

@rescrv rescrv commented Dec 5, 2025

Description of changes

Idempotent creates of attached functions fail because we check the error
and then throw an error anyway. This PR adds a carve out for that error
condition and adds a test of the case that got us into the state where
this was noticed.

Test plan

CI

Migration plan

N/A

Observability plan

N/A

Documentation Changes

N/A

@github-actions
Copy link

github-actions bot commented Dec 5, 2025

Reviewer Checklist

Please leverage this checklist to ensure your code review is thorough before approving

Testing, Bugs, Errors, Logs, Documentation

  • Can you think of any use case in which the code does not behave as intended? Have they been tested?
  • Can you think of any inputs or external events that could break the code? Is user input validated and safe? Have they been tested?
  • If appropriate, are there adequate property based tests?
  • If appropriate, are there adequate unit tests?
  • Should any logging, debugging, tracing information be added or removed?
  • Are error messages user-friendly?
  • Have all documentation changes needed been made?
  • Have all non-obvious changes been commented?

System Compatibility

  • Are there any potential impacts on other parts of the system or backward compatibility?
  • Does this change intersect with any items on our roadmap, and if so, is there a plan for fitting them together?

Quality

  • Is this code of a unexpectedly high quality (Readability, Modularity, Intuitiveness)

@rescrv rescrv requested review from Copilot and tanujnay112 and removed request for Copilot December 5, 2025 01:03
@propel-code-bot
Copy link
Contributor

propel-code-bot bot commented Dec 5, 2025

Handle idempotent attached function creation across coordinator and DAO

The PR fixes coordinator idempotency when re-attaching functions by ensuring the DAO can retrieve both ready and pending attached functions and by tolerating duplicate insert attempts. A new distributed integration test reproduces the attach-detach-attach sequence to prevent regressions, and Go mocks/interfaces are updated accordingly.

Key Changes

• Add GetReadyOrNotReadyByCollectionID to go/pkg/sysdb/metastore/db/dao/task.go and surface it through IAttachedFunctionDb
• Update go/pkg/sysdb/coordinator/task.go to consult the new DAO method and treat common.ErrAttachedFunctionAlreadyExists as an idempotent success path
• Extend chromadb/test/distributed/test_task_api.py with test_count_function_attach_and_detach_attach_attach covering repeated attach calls after detach
• Refresh generated mocks to include the new DAO method

Affected Areas

go/pkg/sysdb/coordinator/task.go
go/pkg/sysdb/metastore/db/dao/task.go
go/pkg/sysdb/metastore/db/dbmodel/task.go
chromadb/test/distributed/test_task_api.py
go/pkg/sysdb/metastore/db/dbmodel/mocks/IAttachedFunctionDb.go

This summary was automatically generated by @propel-code-bot

Comment on lines +232 to +296
def test_count_function_attach_and_detach_attach_attach(basic_http_client: System) -> None:
"""Test creating and removing a function with the record_counter operator"""
client = ClientCreator.from_system(basic_http_client)
client.reset()

# Create a collection
collection = client.get_or_create_collection(
name="my_document",
metadata={"description": "Sample documents for task processing"},
)

# Create a task that counts records in the collection
attached_fn = collection.attach_function(
name="count_my_docs",
function_id="record_counter", # Built-in operator that counts records
output_collection="my_documents_counts",
params=None,
)

# Verify task creation succeeded
assert attached_fn is not None
initial_version = get_collection_version(client, collection.name)

# Add documents
collection.add(
ids=["doc_{}".format(i) for i in range(0, 300)],
documents=["test document"] * 300,
)

# Verify documents were added
assert collection.count() == 300

wait_for_version_increase(client, collection.name, initial_version)
# Give some time to invalidate the frontend query cache
sleep(60)

result = client.get_collection("my_documents_counts").get("function_output")
assert result["metadatas"] is not None
assert result["metadatas"][0]["total_count"] == 300

# Remove the task
success = attached_fn.detach(
delete_output_collection=True,
)

# Verify task removal succeeded
assert success is True

# Create a task that counts records in the collection
attached_fn = collection.attach_function(
name="count_my_docs",
function_id="record_counter", # Built-in operator that counts records
output_collection="my_documents_counts",
params=None,
)
assert attached_fn is not None

# Create a task that counts records in the collection
attached_fn = collection.attach_function(
name="count_my_docs",
function_id="record_counter", # Built-in operator that counts records
output_collection="my_documents_counts",
params=None,
)
assert attached_fn is not None
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Recommended

[Maintainability] [CodeDuplication] The new test test_count_function_attach_and_detach_attach_attach is very similar to the existing test_count_function_attach_and_detach test in the same file. A significant portion of the code, including setting up the collection, attaching the function, adding data, and verifying the initial run, is duplicated.

To improve maintainability and reduce redundancy, consider refactoring the common setup and execution logic into a helper function that both tests can call.

Context for Agents
[CodeDuplication] The new test `test_count_function_attach_and_detach_attach_attach` is very similar to the existing `test_count_function_attach_and_detach` test in the same file. A significant portion of the code, including setting up the collection, attaching the function, adding data, and verifying the initial run, is duplicated.

To improve maintainability and reduce redundancy, consider refactoring the common setup and execution logic into a helper function that both tests can call.

File: chromadb/test/distributed/test_task_api.py
Line: 296

err = s.catalog.metaDomain.AttachedFunctionDb(txCtx).Insert(attachedFunction)
if err != nil {
if err == common.ErrAttachedFunctionAlreadyExists {
// idempotent fall through
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we verify that the nonready function that exists actually matches the currently requested function?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Isn't there a bunch of code above that does this?

Copy link
Contributor

@tanujnay112 tanujnay112 Dec 5, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It does that only for functions that are ready. You might be able to reuse the above checks by changing GetByCollectionID to a function called GetAnyByCollectionID that returns deleted (consistency with other GetAny methods) and non-ready functions.

var attachedFunctions []*dbmodel.AttachedFunction
err := s.db.
Where("input_collection_id = ?", inputCollectionID).
Where("is_deleted = ?", false).
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Technically all the other GetAny methods return deleted functions too so this is an inconsistency. I can add this to my to-clean-up list for cleanup day.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Returning "Deleted" breaks this case. What would you call between GetBy and GetAnyBy?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've renamed it ReadyOrNotReady and documented.

existingAttachedFunctions, err := s.catalog.metaDomain.AttachedFunctionDb(txCtx).GetAnyByCollectionID(req.InputCollectionId)
if err != nil {
log.Error("AttachFunction: failed to check for existing attached function", zap.Error(err))
return err
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[Re: line +102]

Ok now this error is wrong. If you have more than one ready attached function that's a problem. But it could be possible to have a bunch of partially attached nonready function.

See this comment inline on Graphite.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why would we allow that? That seems like a buggy state to allow.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What if they never become ready and are the cumulative result of multiple failed backfill requests?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That sounds like we're letting our invariants go loose. If there can only be one, better not make them fight for that distinction.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Invariant is that there can only be one ready function, that reminds me that I need to add validation for that before making a function ready in the commit phase of AttachFunction 2PC.

@blacksmith-sh

This comment has been minimized.

@rescrv rescrv force-pushed the rescrv/coordinator-idempotent branch from 81e1ce2 to 6f1db5f Compare December 5, 2025 23:07
Idempotent creates of attached functions fail because we check the error
and then throw an error anyway.  This PR adds a carve out for that error
condition and adds a test of the case that got us into the state where
this was noticed.

retrieve any (ready/not-ready) task

Rename ReadyOrNotReady
@rescrv rescrv force-pushed the rescrv/coordinator-idempotent branch from 6f1db5f to 6882911 Compare December 5, 2025 23:43
@blacksmith-sh

This comment has been minimized.

existingAttachedFunctions, err := s.catalog.metaDomain.AttachedFunctionDb(txCtx).GetReadyOrNotReadyByCollectionID(req.InputCollectionId)
if err != nil {
log.Error("AttachFunction: failed to check for existing attached function", zap.Error(err))
return err
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[Re: line +125]

This line of code should be doing what your change down below intends on doing. I wonder with your addition of GetReadyOrNotReadyByCollectionID whether the right thing to do now is to get rid of the fall-through on line 198.

We also now need to make sure line 121 doesn't error out too early if it happens to read a non-matching unready function first.

See this comment inline on Graphite.

@rescrv rescrv closed this Dec 8, 2025
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants